fix(commhub): SSE backpressure + half-open detection (round-2/4 ①)#271
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bfe61ca8df
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if (desired < -MAX_QUEUE_BACKPRESSURE_BYTES) { | ||
| closeClient(client, `queue-overflow-${Math.abs(desired)}b`); |
There was a problem hiding this comment.
Configure the SSE queue to measure bytes
In createSSEStream the ReadableStream is created with the default queuing strategy, so controller.desiredSize is reduced per queued chunk rather than by Uint8Array.byteLength. Comparing that chunk-count value to MAX_QUEUE_BACKPRESSURE_BYTES means the intended 1 MB hard cap actually allows roughly 1,000,000 queued SSE chunks, and a half-open client receiving frequent events can still accumulate far more memory than the configured byte ceiling before this branch closes it. Add a byte-length queuing strategy or track queued byte sizes explicitly before enforcing this limit.
Useful? React with 👍 / 👎.
Issue #1: the public hub has no per-client cap on the SSE ReadableStream queue. A half-open consumer (TCP drop, client crash without FIN) keeps its slot in `clients` AND keeps receiving `controller.enqueue()` calls from both the keepalive timer and every `pushEvent`. The bytes pile up in the stream's internal queue forever — clear OOM vector for any commhub-server exposed to the public internet (see #270 round-2/4). Issue #2 (CHANGE_REQ in v1, fixed in v2): default web-streams queuing strategy counts CHUNKS, not bytes — so a `desiredSize < -1_000_000` threshold meant "one million chunks queued", not "1 MB queued". A single 1 MiB enqueue bumps desiredSize by -1 under the default strategy. v2 explicitly constructs the SSE stream with a byte-counting strategy so the byte-cap actually fires: new ReadableStream<Uint8Array>(src, { highWaterMark: 0, size: (chunk) => chunk.byteLength, }) server/src/push.ts - Byte-counting queuing strategy on the SSE ReadableStream (the critical fix for v1's defeated byte-cap). - `tryEnqueueBytes(client, bytes)` — every enqueue path through one guard. Two thresholds: a. HARD CEILING: `desiredSize < -MAX_QUEUE_BACKPRESSURE_BYTES` (1 MB default) → close immediately. One huge event can't blow up a stuck client. b. STUCK TIMEOUT: `desiredSize < 0` continuously for `STUCK_CLOSE_MS` (60s default) → close. - `controller.desiredSize === null` treated as no headroom → close. - `sweepLiveness()` runs every `LIVENESS_SWEEP_MS` (15s default) so half-open detection doesn't require event traffic. - `closeClient(client, reason)` idempotent, clears keepalive timer, best-effort close. All log lines include reason. - All thresholds env-overridable: ANET_SSE_MAX_QUEUE_BYTES, ANET_SSE_STUCK_CLOSE_MS, ANET_SSE_KEEPALIVE_MS, ANET_SSE_LIVENESS_SWEEP_MS. - Liveness timer uses `.unref()`. server/src/push.test.ts (26 tests total — 21 fake-controller + 3 real-ReadableStream + 2 pre-existing rekey) - Fake-controller tests cover tryEnqueueBytes/sweepLiveness/pushEvent branches with mockable desiredSize. - Real-ReadableStream tests (new in v2): * byte-strategy unit-check: enqueue 1 KB → desiredSize drops by ~1024 (not by 1). The exact assertion that catches the v1 unit- mismatch bug. * non-reading consumer past MAX bytes → hard-ceiling close fires. * reading consumer drains promptly → no spurious close even after many pushes. Test plan - `bun test src/push.test.ts` → 26/26 pass - `COMMHUB_DB=/tmp/test bun test` → 142/143 pass (1 fail is the pre- existing missing @modelcontextprotocol/sdk dev dep, unrelated) - No production deploy. COMMHUB_DB overridden to /tmp/* for tests. - Per 通信龙 dispatch: PR + 通信牛 review gate, no self-merge. - v1 caught by 通信牛 review (Bun probe: default-strategy desiredSize went 1→0 after 1 MiB chunk; byte-strategy went -1048576). Thanks 通信牛 for the close call.
bfe61ca to
2c26d36
Compare
Author
Agent: 通信SDK马
Summary
Round-2/4 review ① per 通信龙 dispatch 2026-06-28 — highest-severity unowned item.
server/src/push.tshad no per-client cap on the SSE ReadableStream queue. A half-open consumer (TCP drop, client crash without FIN) keeps its slot inclientsAND keeps receivingcontroller.enqueue()calls from both the keepalive timer and everypushEvent. Bytes pile up in the stream's internal queue forever — clear OOM vector once exposed to the public internet (any commhub-server hosted at<hub-domain>reachable over WAN).v1 → v2 (post-review fix)
v1 had a unit-mismatch bug caught by 通信牛: the default web-streams queuing strategy counts chunks, not bytes. So
desiredSize < -1_000_000meant "1 million chunks queued", not "1 MB queued" — a 1 MiB single enqueue only nudges desiredSize by-1under the default strategy. The byte-cap was effectively defeated.v2 fix: construct the SSE stream with an explicit byte-counting strategy so the byte-cap actually fires:
通信牛's Bun probe (verified locally in the v2 test suite): default-strategy desiredSize goes
1 → 0after a 1 MiB chunk and-1after 2 MiB; byte-strategy goes0 → -1048576after 2 MiB.What changed
server/src/push.tstryEnqueueBytes(client, bytes)— every enqueue path goes through this guard. Two thresholds:desiredSize < -MAX_QUEUE_BACKPRESSURE_BYTES(1 MB default) → close stream immediately. One huge event can't blow up a stuck client.desiredSize < 0continuously forSTUCK_CLOSE_MS(60s default) → close. Catches slow-trickle leaks.controller.desiredSize === nulltreated as no headroom → close. Defends against runtime errored stream.sweepLiveness()runs everyLIVENESS_SWEEP_MS(15s default) so half-open detection doesn't require event traffic.closeClient(client, reason)idempotent, clears keepalive timer, best-effort close. All log lines include reason for ops triage.connectedframe goes throughtryEnqueueBytestoo.ANET_SSE_MAX_QUEUE_BYTES,ANET_SSE_STUCK_CLOSE_MS,ANET_SSE_KEEPALIVE_MS,ANET_SSE_LIVENESS_SWEEP_MS..unref()so it doesn't hold the event loop open.Tests
The 1 fail is the pre-existing missing
@modelcontextprotocol/sdkdev dep — unrelated to this change, tracked in #261 / round-4 dependency hygiene.26 tests:
createSSEStream uses byte-counting strategy: enqueue 1 KB → desiredSize drops by ≥1024 (not by 1).non-reading consumer past MAX bytes triggers hard ceiling close: pushes that overshoot 1 MB total bytes close the client. Under v1's strategy this test would NOT close (each push = 1 chunk).reading consumer drains queue → no spurious close: drain promptly, no false-positive close.Constraint compliance (per 通信龙 dispatch)
<hub-domain>or any prod DBOut of scope (queued — separate PRs)
/api/statusread-path write-amp → background staleness timer (task [release][P0] v0.10.0 — Direct Runtime + Observability Foundations 规划 #140)Test plan
bun test src/push.test.ts→ 26/26 green (includes 3 real-stream byte-cap probes)bun testbaseline 142/143 (1 pre-existing fail unrelated, see 系统 review round 1 — 可靠性/安全/Runtime/测试·CI·文档 findings #261)